package com.raorao.rpc.client.discovery;

import com.alibaba.fastjson.JSON;
import com.raorao.rpc.common.constants.RpcConstant;
import com.raorao.rpc.common.serializer.ZookeeperSerializer;
import com.raorao.rpc.common.utils.EventPushUtil;
import com.raorao.rpc.event.constants.ServiceChangeStatusEnum;
import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.ZkClient;
import com.raorao.rpc.common.service.ServiceInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Async;

import java.net.URLDecoder;
import java.util.*;

/**
 * Zookeeper服务发现者，定义以Zookeeper为注册中心的服务发现细则
 *
 * @author raorao
 * @since 1.0.0
 */
public class ZookeeperServiceDiscoverer implements ServiceDiscoverer {

    private static Logger logger = LoggerFactory.getLogger(ZookeeperServiceDiscoverer.class);

    /**
     * zookeeper客户端
     */
    private String address;

    /**
     * zookeeper客户端
     */
    private ZkClient zkClient;

    /**
     * 服务清单
     */
    private Map<String,List<ServiceInfo>> serviceInfoMap;

    public ZookeeperServiceDiscoverer(String zkAddress) {
        this.address=address;
        //1、创建zk客户端
        zkClient = new ZkClient(zkAddress);
        zkClient.setZkSerializer(new ZookeeperSerializer());
    }

    /**
     * 使用Zookeeper客户端，通过服务名获取服务列表
     * 服务名格式：接口全路径
     *
     * @param name 服务名
     * @return 服务列表
     */
    @Override
    public List<ServiceInfo> getServices(String name) {
        //1、获取提供者清单
        this.initServices();
        //2、获取服务对应提供者清单
        if(this.serviceInfoMap.containsKey(name)){
            return this.serviceInfoMap.get(name);
        }
        return new ArrayList<>();
    }

    /**
     * @description: 初始化获取服务提供者数据
     * @author raorao
     * @date: 2021/11/19 18:05
     */
    @Async("rpcExecutor")
    @Override
    public  void initServices(){
        if(this.serviceInfoMap!=null){
            return;
        }
        this.serviceInfoMap=new HashMap<>();
        String servicePath = RpcConstant.ZK_SERVICE_PATH;
        //1、获取服务清单
        List<String> children = null;
        if(zkClient.exists(servicePath)){
            children=zkClient.getChildren(servicePath);
        }
        Optional.ofNullable(children).orElse(new ArrayList<>()).stream().forEach(str -> {
            this.addServiceInfo(serviceInfoMap,servicePath+RpcConstant.PATH_DELIMITER +str);
        });
        //2、获取服务对应的提供者清单
        for(String key:serviceInfoMap.keySet()){
            String path=servicePath+RpcConstant.PATH_DELIMITER + key +RpcConstant.PATH_DELIMITER + RpcConstant.PATH_INFO_PATH;
            List<String> providers = null;
            if(zkClient.exists(path)){
                providers=zkClient.getChildren(path);
            }
            Optional.ofNullable(providers).orElse(new ArrayList<>()).stream().forEach(str -> {
                this.addServiceInfo(serviceInfoMap,path+RpcConstant.PATH_DELIMITER+str);
            });
        }
        //3、初始化监听器
        this.watchChlidren(RpcConstant.ZK_SERVICE_PATH);
    }

    /**
     * @description:监听zk节点
     * @author raorao
     * @date: 2021/11/19 18:05
     */
    private void watchChlidren(String path) {
        //1、判断节点是否为rcp服务节点过滤服务提供者节点
        if(path.indexOf(RpcConstant.PATH_DELIMITER + RpcConstant.PATH_INFO_PATH+RpcConstant.PATH_DELIMITER)<0){
            logger.debug("【rpc zookeeper节点监听】path="+path);
            //1.1、对父节点添加监听子节点变化。
            zkClient.subscribeChildChanges(path, new IZkChildListener() {
                @Override
                public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
                    //1.2、更改服务数据
                    changeService(parentPath,currentChilds);
                    if(currentChilds!=null&&!currentChilds.isEmpty()){
                        for(String child : currentChilds){
                            String _path=parentPath+RpcConstant.PATH_DELIMITER+child;
                            watchChlidren(_path);
                        }
                    }
                }
            });
        }
        //2、对子节点添加节点变化监听
        List<String> children =null;
        if(zkClient.exists(path)){
            children=zkClient.getChildren(path);
        }
        Optional.ofNullable(children).orElse(new ArrayList<>()).stream().forEach(str -> {
            watchChlidren(path+RpcConstant.PATH_DELIMITER+str);
        });
    }

    /**
     * 变更服务数据
     * @param parentPath
     * @param currentChilds
     */
    private void changeService(String parentPath, List<String> currentChilds) {
        logger.debug("【rpc zookeeper节点变化】parentPath="+parentPath);
        logger.debug("【rpc zookeeper节点变化】currentChilds="+currentChilds);
        if(parentPath.isEmpty())return;
        String[] strList = parentPath.split(RpcConstant.PATH_DELIMITER);
        //1、处理服务数据
        if(strList.length==2){
            for (String key : serviceInfoMap.keySet()){
                if(currentChilds==null||!currentChilds.contains(key)){
                    serviceInfoMap.put(key,new ArrayList<>());
                    logger.debug("【rpc 移除服务】name="+key);
                }
            }
        }
        //2、处理提供者数据
        else if(strList.length==4){
            String key=strList[2];
            List<ServiceInfo> serviceInfos = serviceInfoMap.get(key);
            if(currentChilds==null||currentChilds.size()==0){
                serviceInfoMap.put(key,new ArrayList<>());
                this.removeZkNode(parentPath);
            }else if(serviceInfos!=null){
                //1.1、对已有服务做处理
                Iterator<ServiceInfo> iterator = serviceInfos.iterator();
                while (iterator.hasNext()){
                    ServiceInfo info = iterator.next();
                    if(!currentChilds.contains(info.getNodeName())){
                        iterator.remove();
                        logger.debug("【rpc 移除服务提供者】name="+info.getNodeName());
                    }else {
                        currentChilds.remove(info.getNodeName());
                    }
                }
                //1.2、添加新服务提供者
                if(currentChilds.size()<1)return;
                for (String path:currentChilds){
                    this.addServiceInfo(serviceInfoMap,parentPath+RpcConstant.PATH_DELIMITER+path);
                }
            }
        }
    }

    /**
     * 移除ZK服务节点
     * @param parentPath
     */
    private void removeZkNode(String parentPath) {
        logger.debug("【rpc 删除无服务节点】path="+parentPath);
        parentPath=parentPath.replace(RpcConstant.PATH_DELIMITER + RpcConstant.PATH_INFO_PATH,"");
        zkClient.deleteRecursive(parentPath);
    }

    /**
     * 生成服务提供者对象
     * @param path
     */
    private void addServiceInfo(Map<String,List<ServiceInfo>> serviceInfoMap,String path) {
        try{
            String[] strList = path.split(RpcConstant.PATH_DELIMITER);
            if(strList.length<2){
                return;
            }
            String key=strList[2];
            if(!serviceInfoMap.containsKey(key)){
                logger.debug("【rpc 添加服务信息】path="+path);
                serviceInfoMap.put(key,new ArrayList<>());
            }
            if(strList.length<4){
                return;
            }
            String str=strList[4];
            List<ServiceInfo> serviceInfos = serviceInfoMap.get(key);
            String serviceInfo = URLDecoder.decode(str, RpcConstant.UTF_8);
            ServiceInfo serviceInfoObj = JSON.parseObject(serviceInfo, ServiceInfo.class);
            serviceInfoObj.setNodeName(str);
            serviceInfos.add(serviceInfoObj);
            EventPushUtil.serviceChangeEvent(serviceInfoObj, ServiceChangeStatusEnum.ADD.getValue());
            logger.debug("【rpc 添加服务提供者信息】path="+path);
        }catch (Exception ex){
            logger.error("【rpc 服务提供者信息解析失败】path="+path);
        }
    }
}
